pyspark的dataframe的单条件、多条件groupBy用法agg 您所在的位置:网站首页 pyspark groupby 组内排序 pyspark的dataframe的单条件、多条件groupBy用法agg

pyspark的dataframe的单条件、多条件groupBy用法agg

2024-02-26 05:34| 来源: 网络整理| 查看: 265

pyspark groupBy方法中用到的知识点 智能搜索引擎 实战中用到的pyspark知识点总结sum和udf方法计算平均得分avg方法计算平均得分count方法计算资源个数collect_list() 将groupBy 的数据处理成列表max取最大值min取最小值多条件groupBy求和sum

智能搜索引擎 实战中用到的pyspark知识点总结

项目中,先配置了spark,通过spark对象连接到hive数据库,在 hive数据库中以dataframe的形式获取数据,使用pyspark的dataframe的相关方法操作数据,最后将整理好的数据写入hive表存入数据库,该篇介绍项目中使用到的groupBy,agg的相关方法。

sum和udf方法计算平均得分

数据介绍:主播表,一个主播有很多直播课,每个直播课都有一个总评分和评分人数,现要计算出该主播的平均得分。

思路:单个主播的所有的直播课的分数全部加起来,除以所有直播课的共同的评分人数。

from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf def Spark(): """ spark配置类 """ sp = Spark() spark = sp.spark df = spark.sql("select anchor_id,live_score,live_comment_count from table_anchor") df = df.groupBy('anchor_id') .agg({"live_score": "sum", "live_comment_count": "sum"}) .withColumnRenamed("sum(live_score)", "total_score") .withColumnRenamed("sum(live_comment_count)", "total_people")

现在就获得了两列 总分数和总人数 使用udf方法自定义函数即可求的平均分

def avg_score(score,people): try: if int(people) != 0: return int(score)/int(people) else: return 0 except: return 0 func = udf(avg_score,DoubleType()) df = df.withColumn("score", func(df.total_score,df.total_people)) df = df.select("anchor_id","score") # 选择ID和分数两列即可存表 avg方法计算平均得分

数据介绍:有一个课程评分的记录表,表项为课程ID,用户ID,该用户对课程的评分,现计算该课程的平均评分

思路:使用 avg方法直接计算。

df = spark.sql("select course_id,score from table_course") df = df.groupBy("course_id") .agg({"score": "avg"}) .withColumnRenamed("avg(score)", "avg_score") df = df.select("course_id","avg_score")

获取course_id 和avg_score 之后即可存表 注:spark为上一节中spark

count方法计算资源个数

数据介绍:有一个课程评分的记录表,表项为课程ID,用户ID,该用户对课程的评论,现计算该课程有多少人评论

df = spark.sql("select course_id,comment from table_course") df = df.groupBy("course_id") .agg({"comment": "count"}) .withColumnRenamed("count(comment)", "comment_count") df = df.select("course_id","comment_count")

获取course_id 和comment_count 之后即可存表 注:spark为第一节中spark

collect_list() 将groupBy 的数据处理成列表

数据介绍:有一个课程评分的记录表,表项为课程ID,用户ID,该用户对课程的评论,现将用户评论放在列表中,便于后续处理

from pyspark.sql.functions import collect_list df = spark.sql("select course_id,comment from table_course") df = df.groupBy("course_id") .agg(collect_list("comment")) .withColumnRenamed("collect_list(comment)", "comment_list") df = df.select("course_id","comment_list")

获取course_id 和comment_list 之后即可存表,或者对comment_list使用udf自定义函数,将数据组拼接成字符串。 注:spark为第一节中spark, 注意import包

max取最大值min取最小值

数据介绍:供应商发布课程表,供应商发布了很多课程,发布的每个课程都有发布时间,在数据库中是timestamp类型,数据表中表项为org_id, course_id, publish_date,现要获取供应商发布课程的最新时间作为该供应商的最新活跃时间。

思路:timestamp类型的数据也可使用max

df = spark.sql("select org_id,course_id,publish_date from org_table") df = df.groupBy("org_id") .agg({"publish_date": "max"}) .withColumnRenamed("max(publish_date)", "active_time") df = df.select("org_id","active_time")

获取org_id 和active_time 之后即可存表。 注:spark为第一节中spark

多条件groupBy求和sum

数据介绍:一张企业表,企业下有很多职位,不同的企业可能有相同的职位,基于每个企业下的每个职位,统计了该职位某个热搜词被检索的次数,现需要根据职位类别统计各个热搜词被检索的总次数。

企业ID职位ID热搜词该热搜词被检索次数org_idpos_idwordword_countA1pos001保险金融1A1pos002保险金融2A1pos003保险金融3A1pos004保险金融4A2pos001保险金融5A2pos001教育6A2pos003保险金融7A2pos003智能8

思路:

df = spark.sql("select pos_id,word,word_count from org_table") df = df.groupBy("pos_id","word") .agg({"word_count": "sum"}) .withColumnRenamed("sum(word_count)", "total_count") df = df.select("pos_id","word","total_count")

根据职位类别ID和热搜词两个条件聚合数据,将相同职位ID和相同热搜词的表项聚合在一起,将聚合在一起的词频累加。 注:spark为第一节中spark

结果:

职位ID热搜词该热搜词被检索次数pos_idwordword_countpos001保险金融6pos002保险金融2pos003保险金融10pos004保险金融4pos001教育6pos003智能8

此时如若需要对某个职位下的所有的热搜词按频率排序,然后只保留top5,则后续工作可以使用udf自定义函数的方法,将词和词频整合成字典或者列表,然后按照pos_id聚合再一起,用sorted按词频排序。

本文部分知识点参考链接



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有